# Copyright 2008-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import fcntl
import errno
import gzip

import portage
from portage import os, _encodings, _unicode_encode
from portage.util.futures import asyncio
from portage.util.futures._asyncio.streams import _writer
from portage.util.futures.unix_events import _set_nonblocking
from _emerge.AbstractPollTask import AbstractPollTask


class PipeLogger(AbstractPollTask):
    """
    This can be used for logging output of a child process,
    optionally outputting to log_file_path and/or stdout_fd.  It can
    also monitor for EOF on input_fd, which may be used to detect
    termination of a child process. If log_file_path ends with
    '.gz' then the log file is written with compression.
    """

    __slots__ = ("input_fd", "log_file_path", "stdout_fd") + (
        "_io_loop_task",
        "_log_file",
        "_log_file_nb",
        "_log_file_real",
    )

    def _start(self):
        log_file_path = self.log_file_path
        if hasattr(log_file_path, "write"):
            self._log_file_nb = True
            self._log_file = log_file_path
            _set_nonblocking(self._log_file.fileno())
        elif log_file_path is not None:
            try:
                self._log_file = open(
                    _unicode_encode(
                        log_file_path, encoding=_encodings["fs"], errors="strict"
                    ),
                    mode="ab",
                )

                if log_file_path.endswith(".gz"):
                    self._log_file_real = self._log_file
                    self._log_file = gzip.GzipFile(
                        filename="", mode="ab", fileobj=self._log_file
                    )

                portage.util.apply_secpass_permissions(
                    log_file_path,
                    uid=portage.portage_uid,
                    gid=portage.portage_gid,
                    mode=0o660,
                )
            except FileNotFoundError:
                if self._was_cancelled():
                    self._async_wait()
                    return
                raise

        if isinstance(self.input_fd, int):
            self.input_fd = os.fdopen(self.input_fd, "rb", 0)

        fd = self.input_fd.fileno()

        fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

        self._io_loop_task = asyncio.ensure_future(
            self._io_loop(self.input_fd), loop=self.scheduler
        )
        self._io_loop_task.add_done_callback(self._io_loop_done)
        self._registered = True

    def _cancel(self):
        self._unregister()
        if self.returncode is None:
            self.returncode = self._cancelled_returncode

    async def _io_loop(self, input_file):
        background = self.background
        stdout_fd = self.stdout_fd
        log_file = self._log_file
        fd = input_file.fileno()

        while True:
            buf = self._read_buf(fd)

            if buf is None:
                # not a POLLIN event, EAGAIN, etc...
                future = self.scheduler.create_future()
                self.scheduler.add_reader(fd, future.set_result, None)
                try:
                    await future
                finally:
                    # The loop and input file may have been closed.
                    if not self.scheduler.is_closed():
                        future.done() or future.cancel()
                        # Do not call remove_reader in cases where fd has
                        # been closed and then re-allocated to a concurrent
                        # coroutine as in bug 716636.
                        if not input_file.closed:
                            self.scheduler.remove_reader(fd)
                continue

            if not buf:
                # EOF
                return

            if not background and stdout_fd is not None:
                failures = 0
                stdout_buf = buf
                while stdout_buf:
                    try:
                        stdout_buf = stdout_buf[os.write(stdout_fd, stdout_buf) :]
                    except OSError as e:
                        if e.errno != errno.EAGAIN:
                            raise
                        del e
                        failures += 1
                        if failures > 50:
                            # Avoid a potentially infinite loop. In
                            # most cases, the failure count is zero
                            # and it's unlikely to exceed 1.
                            raise

                        # This means that a subprocess has put an inherited
                        # stdio file descriptor (typically stdin) into
                        # O_NONBLOCK mode. This is not acceptable (see bug
                        # #264435), so revert it. We need to use a loop
                        # here since there's a race condition due to
                        # parallel processes being able to change the
                        # flags on the inherited file descriptor.
                        # TODO: When possible, avoid having child processes
                        # inherit stdio file descriptors from portage
                        # (maybe it can't be avoided with
                        # PROPERTIES=interactive).
                        fcntl.fcntl(
                            stdout_fd,
                            fcntl.F_SETFL,
                            fcntl.fcntl(stdout_fd, fcntl.F_GETFL) ^ os.O_NONBLOCK,
                        )

            if log_file is not None:
                if self._log_file_nb:
                    # Use the _writer function which uses os.write, since the
                    # log_file.write method looses data when an EAGAIN occurs.
                    await _writer(log_file, buf)
                else:
                    # For gzip.GzipFile instances, the above _writer function
                    # will not work because data written directly to the file
                    # descriptor bypasses compression.
                    log_file.write(buf)
                    log_file.flush()

    def _io_loop_done(self, future):
        try:
            future.result()
        except asyncio.CancelledError:
            self.cancel()
            self._was_cancelled()
        self.returncode = self.returncode or os.EX_OK
        self._async_wait()

    def _unregister(self):
        if self.input_fd is not None:
            if isinstance(self.input_fd, int):
                os.close(self.input_fd)
            elif not self.input_fd.closed:
                self.scheduler.remove_reader(self.input_fd.fileno())
                self.input_fd.close()
            self.input_fd = None

        if self._io_loop_task is not None:
            if not self.scheduler.is_closed():
                self._io_loop_task.done() or self._io_loop_task.cancel()
            self._io_loop_task = None

        if self.stdout_fd is not None:
            os.close(self.stdout_fd)
            self.stdout_fd = None

        if self._log_file is not None:
            if not self._log_file.closed:
                self.scheduler.remove_writer(self._log_file.fileno())
                self._log_file.close()
            self._log_file = None

        if self._log_file_real is not None:
            # Avoid "ResourceWarning: unclosed file" since python 3.2.
            self._log_file_real.close()
            self._log_file_real = None

        self._registered = False
